/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.handler.dataimport;

import static org.apache.solr.handler.dataimport.DataImportHandlerException.SEVERE;
import static org.apache.solr.handler.dataimport.DataImportHandlerException.wrapAndThrow;
import itsm.isperp.framework.core.context.ContextHolder;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * <p>
 * A DataSource implementation which can fetch data using JDBC.
 * </p>
 * <p/>
 * <p>
 * Refer to <a
 * href="http://wiki.apache.org/solr/DataImportHandler">http://wiki.apache
 * .org/solr/DataImportHandler</a> for more details.
 * </p>
 * <p/>
 * <b>This API is experimental and may change in the future.</b>
 * 
 * @since solr 1.3
 */
public class JdbcDataSource extends DataSource<Iterator<Map<String, Object>>> {
	private static final Logger LOG = LoggerFactory
			.getLogger(JdbcDataSource.class);

	protected Callable<Connection> factory;

	private long connLastUsed = 0;

	private Connection conn;

	private Map<String, Integer> fieldNameVsType = new HashMap<String, Integer>();

	private boolean convertType = false;

	private int batchSize = FETCH_SIZE;

	private int maxRows = 0;

	@Override
	public void init(Context context, Properties initProps) {
		Object o = initProps.get(CONVERT_TYPE);
		if (o != null)
			convertType = Boolean.parseBoolean(o.toString());

		factory = createConnectionFactory(context, initProps);

		String bsz = initProps.getProperty("batchSize");
		if (bsz != null) {
			bsz = context.replaceTokens(bsz);
			try {
				batchSize = Integer.parseInt(bsz);
				if (batchSize == -1)
					batchSize = Integer.MIN_VALUE;
			} catch (NumberFormatException e) {
				LOG.warn("Invalid batch size: " + bsz);
			}
		}

		for (Map<String, String> map : context.getAllEntityFields()) {
			String n = map.get(DataImporter.COLUMN);
			String t = map.get(DataImporter.TYPE);
			if ("sint".equals(t) || "integer".equals(t))
				fieldNameVsType.put(n, Types.INTEGER);
			else if ("slong".equals(t) || "long".equals(t))
				fieldNameVsType.put(n, Types.BIGINT);
			else if ("float".equals(t) || "sfloat".equals(t))
				fieldNameVsType.put(n, Types.FLOAT);
			else if ("double".equals(t) || "sdouble".equals(t))
				fieldNameVsType.put(n, Types.DOUBLE);
			else if ("date".equals(t))
				fieldNameVsType.put(n, Types.DATE);
			else if ("boolean".equals(t))
				fieldNameVsType.put(n, Types.BOOLEAN);
			else if ("binary".equals(t))
				fieldNameVsType.put(n, Types.BLOB);
			else
				fieldNameVsType.put(n, Types.VARCHAR);
		}
	}

	protected Callable<Connection> createConnectionFactory(
			final Context context, final Properties initProps) {
		// final VariableResolver resolver = context.getVariableResolver();
		resolveVariables(context, initProps);

		String s = initProps.getProperty("maxRows");
		if (s != null) {
			maxRows = Integer.parseInt(s);
		}
		// 重写获取dataSource的factory
		return factory = new Callable<Connection>() {
			@Override
			public Connection call() throws Exception {
				LOG.info("Creating a connection for entity "
						+ context.getEntityAttribute(DataImporter.NAME));
				javax.sql.DataSource ds = ContextHolder
						.getSpringBean("dataSource");

				return ds.getConnection();
			}

		};
	}

	private void resolveVariables(Context ctx, Properties initProps) {
		for (Map.Entry<Object, Object> entry : initProps.entrySet()) {
			if (entry.getValue() != null) {
				entry.setValue(ctx.replaceTokens((String) entry.getValue()));
			}
		}
	}

	@Override
	public Iterator<Map<String, Object>> getData(String query) {
		ResultSetIterator r = new ResultSetIterator(query);
		return r.getIterator();
	}

	private void logError(String msg, Exception e) {
		LOG.warn(msg, e);
	}

	private List<String> readFieldNames(ResultSetMetaData metaData)
			throws SQLException {
		List<String> colNames = new ArrayList<String>();
		int count = metaData.getColumnCount();
		for (int i = 0; i < count; i++) {
			colNames.add(metaData.getColumnLabel(i + 1));
		}
		return colNames;
	}

	private class ResultSetIterator {
		ResultSet resultSet;

		Statement stmt = null;

		List<String> colNames;

		Iterator<Map<String, Object>> rSetIterator;

		public ResultSetIterator(String query) {

			try {
				Connection c = getConnection();
				stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY,
						ResultSet.CONCUR_READ_ONLY);
				stmt.setFetchSize(batchSize);
				stmt.setMaxRows(maxRows);
				LOG.debug("Executing SQL: " + query);
				long start = System.nanoTime();
				if (stmt.execute(query)) {
					resultSet = stmt.getResultSet();
				}
				LOG.trace("Time taken for sql :"
						+ TimeUnit.MILLISECONDS.convert(System.nanoTime()
								- start, TimeUnit.NANOSECONDS));
				colNames = readFieldNames(resultSet.getMetaData());
			} catch (Exception e) {
				wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);
			}
			if (resultSet == null) {
				rSetIterator = new ArrayList<Map<String, Object>>().iterator();
				return;
			}

			rSetIterator = new Iterator<Map<String, Object>>() {
				@Override
				public boolean hasNext() {
					return hasnext();
				}

				@Override
				public Map<String, Object> next() {
					return getARow();
				}

				@Override
				public void remove() {/* do nothing */
				}
			};
		}

		private Iterator<Map<String, Object>> getIterator() {
			return rSetIterator;
		}

		private Map<String, Object> getARow() {
			if (resultSet == null)
				return null;
			Map<String, Object> result = new HashMap<String, Object>();
			for (String colName : colNames) {
				try {
					if (!convertType) {
						// Use underlying database's type information
						result.put(colName, resultSet.getObject(colName));
						continue;
					}

					Integer type = fieldNameVsType.get(colName);
					if (type == null)
						type = Types.VARCHAR;
					switch (type) {
					case Types.INTEGER:
						result.put(colName, resultSet.getInt(colName));
						break;
					case Types.FLOAT:
						result.put(colName, resultSet.getFloat(colName));
						break;
					case Types.BIGINT:
						result.put(colName, resultSet.getLong(colName));
						break;
					case Types.DOUBLE:
						result.put(colName, resultSet.getDouble(colName));
						break;
					case Types.DATE:
						result.put(colName, resultSet.getTimestamp(colName));
						break;
					case Types.BOOLEAN:
						result.put(colName, resultSet.getBoolean(colName));
						break;
					case Types.BLOB:
						result.put(colName, resultSet.getBytes(colName));
						break;
					default:
						result.put(colName, resultSet.getString(colName));
						break;
					}
				} catch (SQLException e) {
					logError("Error reading data ", e);
					wrapAndThrow(SEVERE, e, "Error reading data from database");
				}
			}
			return result;
		}

		private boolean hasnext() {
			if (resultSet == null)
				return false;
			try {
				if (resultSet.next()) {
					return true;
				} else {
					close();
					return false;
				}
			} catch (SQLException e) {
				close();
				wrapAndThrow(SEVERE, e);
				return false;
			}
		}

		private void close() {
			try {
				if (resultSet != null)
					resultSet.close();
				if (stmt != null)
					stmt.close();
			} catch (Exception e) {
				logError("Exception while closing result set", e);
			} finally {
				resultSet = null;
				stmt = null;
			}
		}
	}

	private Connection getConnection() throws Exception {
		long currTime = System.nanoTime();
		if (currTime - connLastUsed > CONN_TIME_OUT) {
			synchronized (this) {
				Connection tmpConn = factory.call();
				closeConnection();
				connLastUsed = System.nanoTime();
				return conn = tmpConn;
			}

		} else {
			connLastUsed = currTime;
			return conn;
		}
	}

	@Override
	protected void finalize() throws Throwable {
		try {
			if (!isClosed) {
				LOG.error("JdbcDataSource was not closed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");
				close();
			}
		} finally {
			super.finalize();
		}
	}

	private boolean isClosed = false;

	@Override
	public void close() {
		try {
			closeConnection();
		} finally {
			isClosed = true;
		}
	}

	private void closeConnection() {
		try {
			if (conn != null) {
				try {
					// SOLR-2045
					conn.commit();
				} catch (Exception ex) {
					// ignore.
				}
				conn.close();
			}
		} catch (Exception e) {
			LOG.error("Ignoring Error when closing connection", e);
		}
	}

	private static final long CONN_TIME_OUT = 10 * 1000; // 10 seconds

	private static final int FETCH_SIZE = 500;

	public static final String URL = "url";

	public static final String JNDI_NAME = "jndiName";

	public static final String DRIVER = "driver";

	public static final String CONVERT_TYPE = "convertType";
}
